Linear RegressionΒΆ

Problem Statement
The input data set contains data about details of various car models.
Based on the information provided, the goal is to come up with a model
to predict Miles-per-gallon of a given model.
Techniques used:
1. Linear Regression ( multi-variate)
2. Data Imputation - replacing non-numeric data with numeric ones
3. Variable Reduction - picking up only relevant features
# -*- coding: utf-8 -*-

import os

os.chdir("/home/cloudops/spark")
os.curdir

# Load the CSV file into a RDD
autoData = sc.textFile("data/auto-miles-per-gallon.csv")
autoData.cache()
# auto-miles-per-gallon.csv MapPartitionsRDD[1] at textFile

# Remove the first line (contains headers)
dataLines = autoData.filter(lambda x: "CYLINDERS" not in x)
dataLines.count()   # 398

# Convert the RDD into a Dense Vector. As a part of this exercise
# 1. Remove unwanted columns
# 2. Change non-numeric ( values=? ) to numeric

# NOTE: Install NumPy before
# > pip3 install numpy

import math
# from pyspark.mllib.linalg import Vectors
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

# Use default average HorsePower (HP) for missed values
# this broadcast constant could be calculated from existing values
avgHP = sc.broadcast(80.0)

def transformToNumeric(inputStr):
    global avgHP
    attList = inputStr.split(",")

    #Replace ? values with a normal value
    hpValue = attList[3]
    if hpValue == "?":
        hpValue = avgHP.value

    # Filter out columns not wanted at this stage
    values= Vectors.dense([ float(attList[0]), \
                     float(attList[1]), \
                     hpValue, \
                     float(attList[5]), \
                     float(attList[6])
                     ])
    return values

# Keep only MPG, CYLINDERS, HP, ACCELERATION and MODELYEAR
autoVectors = dataLines.map(transformToNumeric)
autoVectors.collect()
# [DenseVector([18.0, 8.0, 130.0, 12.0, 70.0]),
# DenseVector([15.0, 8.0, 165.0, 11.5, 70.0]),
# DenseVector([18.0, 8.0, 150.0, 11.0, 70.0]),
# DenseVector([16.0, 8.0, 150.0, 12.0, 70.0]),
# . . .

# =====================================
# Perform Statistical Analysis
# =====================================
# NOTE: MLLib library

from pyspark.mllib.stat import Statistics

autoStats = Statistics.colStats(autoVectors)

autoStats.mean()
# array([ 23.51457286,   5.45477387, 104.10050251,  15.56809045,
#        76.01005025])

autoStats.variance()
# array([  61.08961077,    2.89341544, 1468.09062947,    7.60484823,
#         13.67244282])

autoStats.min()
# array([  61.08961077,    2.89341544, 1468.09062947,    7.60484823,
#         13.67244282])

autoStats.max()
# array([ 46.6,   8. , 230. ,  24.8,  82. ])

Statistics.corr(autoVectors)
# array([[ 1.        , -0.77539629, -0.77463084,  0.42028891,  0.57926713],
#       [-0.77539629,  1.        ,  0.84275215, -0.50541949, -0.3487458 ],
#       [-0.77463084,  0.84275215,  1.        , -0.68829885, -0.41559383],
#       [ 0.42028891, -0.50541949, -0.68829885,  1.        ,  0.28813695],
#       [ 0.57926713, -0.3487458 , -0.41559383,  0.28813695,  1.        ]])

# 0.42028891 - low correlation between MPG and ACCELERATION
# The ACCELERATION column [3] could be removed

# =====================================
# Create SQL Context for ML Lib
# =====================================
# Transform to a Data Frame for input to Machine Learning
# Drop columns that are not required (low correlation)

from pyspark.sql import SQLContext

sqlContext = SQLContext(sc)

def transformToLabeledPoint(inStr):
    ''' [0] - Target variable (MPG) in Label Point
    '''
    lp = (float(inStr[0]), \
          Vectors.dense([inStr[1], inStr[2], inStr[4]]))
    return lp

# new RDD
autoLp = autoVectors.map(transformToLabeledPoint)

# new Data Frame from RDD (ML library, not MLLib library!!!)
autoDF = sqlContext.createDataFrame(autoLp, ["label", "features"])

# Display top 10 rows
autoDF.select("label", "features").show(10)
# +-----+----------------+
# |label|        features|
# +-----+----------------+
# | 18.0|[8.0,130.0,70.0]|
# | 15.0|[8.0,165.0,70.0]|
# | 18.0|[8.0,150.0,70.0]|
# | 16.0|[8.0,150.0,70.0]|
# | 17.0|[8.0,140.0,70.0]|
# | 15.0|[8.0,198.0,70.0]|
# | 14.0|[8.0,220.0,70.0]|
# | 14.0|[8.0,215.0,70.0]|
# | 14.0|[8.0,225.0,70.0]|
# | 15.0|[8.0,190.0,70.0]|
# +-----+----------------+

# =====================================
# Find Correlations
# =====================================
# from features column (1)
numFeatures = autoDF.take(1)[0].features.size   # 3

# labelRDD = autoDF.map(lambda lp: float(lp.label))
# ERROR: 'DataFrame' object has no attribute 'map'
#   Keep in mind that MLLIB is built around RDDs
# while ML is generally built around Data Frames.
#   You can't map a dataframe, but you can convert the dataframe
# to an RDD and map that by doing spark_df.rdd.map().
# Prior to Spark 2.0, spark_df.map would alias to spark_df.rdd.map().
# With Spark 2.0, you must explicitly call .rdd first.

# separate label RDD for report
labelRDD = autoDF.rdd.map(lambda lp: float(lp.label))

for i in range(numFeatures):
    featureRDD = autoDF.rdd.map(lambda lp: lp.features[i])
    corr = Statistics.corr(labelRDD, featureRDD, 'pearson')
    print('%d\t%g' % (i, corr))
# 0       -0.775396
# 1       -0.774631
# 2       0.579267

# =====================================
# Split DataFrame into Training and Testing data
# =====================================
# Usually splitting 70/30, but we have small dataset
(trainingData, testData) = autoDF.randomSplit([0.9, 0.1])
trainingData.count()   # 366
testData.count()       # 32

# =====================================
# Build the model on Training Data
# =====================================
#        Linear Regression
# =====================================
from pyspark.ml.regression import LinearRegression

# usually 100 iterations to create model
lr = LinearRegression(maxIter=10)

# Fit model to the training data
lrModel = lr.fit(trainingData)
# ERROR:
# IllegalArgumentException: 'requirement failed: Column features must be of type
# struct<type:tinyint,size:int,indices:array<int>,values:array<double>> but was actually
# struct<type:tinyint,size:int,indices:array<int>,values:array<double>>.'

# trainingData -> DataFrame[label: double, features: vector]

# Workaround:
# instead of
#   from pyspark.mllib.linalg import Vectors
#$ use:
#   from pyspark.ml.linalg import Vectors
#   from pyspark.ml.feature import VectorAssembler
# See:
#   https://spark.apache.org/docs/latest/ml-features.html#vectorassembler
# OK

# Multiple regression formula Y = b + a1X1 + a2X2 + a3X3
# a1, a2, a3
print("Coefficients: " + str(lrModel.coefficients))
# Coefficients: [-1.996622715254686,-0.05670662125694535,0.6372473703412905]

# b
print("Intercept: " + str(lrModel.intercept))
# Intercept: -8.121408295465546

# =====================================
# Predict on the Test Data
# =====================================
predictions = lrModel.transform(testData)

predictions.select("prediction", "label", "features").show()
#     Prediction        MPG
# +------------------+-----+----------------+
# |        prediction|label|        features|
# +------------------+-----+----------------+
# | 8.321002336144055| 10.0|[8.0,215.0,70.0]|
# |12.954662267501302| 12.0|[8.0,167.0,73.0]|
# |10.942981450478428| 12.0|[8.0,180.0,71.0]|
# |13.918674828869374| 13.0|[8.0,150.0,73.0]|
# |12.784542403730466| 13.0|[8.0,170.0,73.0]|
# . . .
# |26.795409654500315| 24.0| [4.0,75.0,74.0]|
# |28.920503714037064| 24.5| [4.0,60.0,76.0]|
# |26.795409654500315| 25.0| [4.0,75.0,74.0]|
# +------------------+-----+----------------+

# =====================================
# Evaluate the Model
# =====================================
# label column is a target column
# r2 -> R^2 metric
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol = "label", \
                 metricName = "r2")

evaluator.evaluate(predictions)
# 0.7844712339243454   -> Good Model (> 0.7)